/*
 * Copyright (c) 2014-2015 Snowplow Analytics Ltd. All rights reserved.
 *
 * This program is licensed to you under the Apache License Version 2.0,
 * and you may not use this file except in compliance with the Apache License Version 2.0.
 * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the Apache License Version 2.0 is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
 */
package com.snowplowanalytics
package snowplow
package enrich
package common
package adapters
package registry

// Iglu
import iglu.client.{
  SchemaKey,
  Resolver
}

// Scala
import scala.util.control.NonFatal

// Scalaz
import scalaz._
import Scalaz._

// Joda-Time
import org.joda.time.DateTime

// json4s
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

// This project
import loaders.{
  CollectorPayload,
  CollectorContext
}
import utils.ConversionUtils

/**
 * Transforms a Cloudfront access log into raw events
 */
object CloudfrontAccessLogAdapter {

  /**
   * Adapter for Cloudfront web distribution access log files
   */
  object WebDistribution extends Adapter {

    private val FieldNames = List(
      "dateTime",
      "xEdgeLocation",
      "scBytes",
      "cIp",
      "csMethod",
      "csHost",
      "csUriStem",
      "scStatus",
      "csReferer",
      "csUserAgent",
      "csUriQuery",
      "csCookie",
      "xEdgeResultType",
      "xEdgeRequestId",
      "xHostHeader",
      "csProtocol",
      "csBytes",
      "timeTaken",
      "xForwardedFor",
      "sslProtocol",
      "sslCipher",
      "xEdgeResponseResultType"
    )

    // Tracker version for Cloudfront access log
    private val TrackerVersion = "com.amazon.aws.cloudfront/wd_access_log"

    /**
     * Converts a CollectorPayload instance into raw events.
     * Chooses a wd_access_log schema version based on the length of the TSV
     * Extracts the collector timestamp and IP address from the TSV
     *
     * @param payload Generated by the TsvLoader. Its body is the raw TSV.
     * @param resolver (implicit) The Iglu resolver used for
     *        schema lookup and validation. Not used
     * @return a validation boxing either a NEL of raw events or a NEL of failure strings
     */
    def toRawEvents(payload: CollectorPayload)(implicit resolver: Resolver): ValidatedRawEvents =
      payload.body match {
        case Some(p) => {
          val fields = p.split("\t", -1)
          val schemaVersion = fields.size match {
            case 12 => "1-0-0".successNel  // Before 12 Sep 2012
            case 15 => "1-0-1".successNel  // 12 Sep 2012
            case 18 => "1-0-2".successNel  // 21 Oct 2013
            case 19 => "1-0-3".successNel  // 29 Apr 2014
            case 23 => "1-0-4".successNel  // 01 Jul 2015
            case n => s"Access log TSV line contained $n fields, expected 12, 15, 18, 19, or 23".failNel
          }
          schemaVersion.flatMap(v => {

            // Combine the first two fields into a timestamp
            val schemaCompatibleFields = "%sT%sZ".format(fields(0), fields(1)) :: fields.toList.tail.tail

            // Attempt to build the json, accumulating errors from unparseable fields
            def buildJson(errors: List[String], fields: List[(String, String)], json: JObject): (List[String], JObject) = {
              fields match {
                case Nil => (errors, json)
                case head :: tail => head match {

                  case (name, "") => buildJson(errors, tail, json ~ (name, null))
                  case ("timeTaken", field) => try {
                        buildJson(errors, tail, json ~ ("timeTaken", field.toDouble))
                      } catch {
                        case e: NumberFormatException => buildJson("Field [timeTaken]: cannot convert [%s] to Double".format(field) :: errors, tail, json)
                    }
                  case (name, field) if name == "csBytes" || name == "scBytes" => try {
                        buildJson(errors, tail, json ~ (name, field.toInt))
                      } catch {
                        case e: NumberFormatException => buildJson("Field [%s]: cannot convert [%s] to Int".format(name, field) :: errors, tail, json)
                      }
                  case (name, field) if name == "csReferer" || name == "csUserAgent" => ConversionUtils.doubleDecode(name, field).fold(
                    e => buildJson(e :: errors, tail, json),
                    s => buildJson(errors, tail, json ~ (name, s))
                    )
                  case ("csUriQuery", field) => buildJson(errors, tail, json ~ ("csUriQuery", ConversionUtils.singleEncodePcts(field)))
                  case (name, field) => buildJson(errors, tail, json ~ (name, field))
                }
              }
            }

            val (errors, ueJson) = buildJson(Nil, FieldNames zip schemaCompatibleFields, JObject())

            val failures = errors match {
              case Nil => None.successNel
              case h :: t => (NonEmptyList(h) :::> t).fail // list to nonemptylist
            }
            
            val validatedTstamp = toTimestamp(fields(0), fields(1)).map(Some(_)).toValidationNel

            (validatedTstamp |@| failures) {(tstamp, e) =>
              val ip = schemaCompatibleFields(3) match {
                case "" => None
                case nonempty => nonempty.some
              }

              val qsParams: Map[String, String] = schemaCompatibleFields(8) match {
                case "" => Map()
                case url => Map("url" -> url)
              }

              val userAgent = schemaCompatibleFields(9) match {
                case "" => None
                case nonempty => ConversionUtils.singleEncodePcts(nonempty).some
              }

              val parameters = toUnstructEventParams(
                TrackerVersion,
                qsParams,
                s"iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/$v",
                ueJson,
                "srv"
                )
              NonEmptyList(RawEvent(
                api          = payload.api,
                parameters   = parameters,
                contentType  = payload.contentType,
                source       = payload.source,
                context      = CollectorContext(tstamp, ip, userAgent, None, Nil, None)
              ))
            }
          })
        }
        case None => "Cloudfront TSV has no body - this should be impossible".failNel
      }

    /**
     * Converts a CloudFront log-format date and
     * a time to a timestamp.
     *
     * @param date The CloudFront log-format date
     * @param time The CloudFront log-format time
     * @return the timestamp as a Joda DateTime
     *         or an error String, all wrapped in
     *         a Scalaz Validation
     */
    def toTimestamp(date: String, time: String): Validation[String, DateTime] =
      try {
        DateTime.parse("%sT%s+00:00".format(date, time)).success // Construct a UTC ISO date from CloudFront date and time
      } catch {
        case NonFatal(e) =>
          "Unexpected exception converting Cloudfront web distribution access log date [%s] and time [%s] to timestamp: [%s]".format(date, time, e.getMessage).fail
      }
  }
}
